{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 队列 - 生产者消费者模式"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```py\n",
    "import asyncio\n",
    "\n",
    "\n",
    "async def consumer(n, q):\n",
    "    print('consumer {}: starting'.format(n))\n",
    "    while True:\n",
    "        print('consumer {}: waiting for item'.format(n))\n",
    "        item = await q.get()\n",
    "        print('consumer {}: get item {}'.format(n, item))\n",
    "        if item is None:\n",
    "            # None is the signal to stop.\n",
    "            q.task_done()\n",
    "            break\n",
    "        else:\n",
    "            await asyncio.sleep(0.5 * item)\n",
    "            q.task_done()\n",
    "    print('consumer {}: ending'.format(n))\n",
    "\n",
    "\n",
    "async def producer(q, num_workers):\n",
    "    print('producer: starting')\n",
    "    # Add some numbers to the queue to simulate jobs\n",
    "    for i in range(num_workers * 3):\n",
    "        await q.put(i)\n",
    "        print('producer: added item {} to the queue'.format(i))\n",
    "        # Add None entries in the queue\n",
    "    # to signal the consumers to exit\n",
    "    print('producer: adding stop signals to the queue')\n",
    "    for i in range(num_workers):\n",
    "        await q.put(None)\n",
    "    print('producer: waiting for queue to empty')\n",
    "    await q.join()\n",
    "    print('producer: ending')\n",
    "\n",
    "\n",
    "async def main(loop, num_consumers):\n",
    "    # Create the queue with a fixed size so the producer\n",
    "    # will block until the consumers pull some items out.\n",
    "    q = asyncio.Queue(maxsize=num_consumers)\n",
    "\n",
    "    # Scheduled the consumer tasks.\n",
    "    consumers = [loop.create_task(consumer(i + 1, q)) for i in range(num_consumers)]\n",
    "\n",
    "    # Schedule the producer task.\n",
    "    prod = loop.create_task(producer(q, num_consumers))\n",
    "\n",
    "    # Wait for all of the coroutines to finish.\n",
    "    await asyncio.wait([*consumers, prod])\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "\n",
    "    event_loop = asyncio.get_event_loop()\n",
    "    try:\n",
    "        event_loop.run_until_complete(main(event_loop, 3))\n",
    "    finally:\n",
    "        event_loop.close()\n",
    "```"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": false
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
